import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.concat;

public class AddColumns {


    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4)
        ));


        tEnv.createTemporaryView("Orders", orderA,$("user"),$("product"),$("amount"));
//        tEnv.registerDataStream("Orders", orderA,"user,product,amount");

        Table orders = tEnv.from("Orders");

        Table result = orders.addColumns(concat($("product"), "_discount").as("c"))
        .select($("user"),$("product"),$("c"));

        tEnv.toAppendStream(result,Row.class).print();
        env.execute();




    }
}
